###
### create datasets (1 per day) of all news viewing intervals
### at household level
###

library(dplyr)
library(purrr)
library(tidyr)
library(forcats)
library(stringr)
library(readr)
library(tibble)
library(data.table)
library(parallel)
library(lubridate)
library(hms)
library(magrittr)

setDTthreads(1)

### SET WORKING DIRECTORY HERE 
path_to_archive <- "replication/"
setwd(path_to_archive)


# for timezone adjustments
load("data/dma_timezone.RData")
dma_timezone <- dma_timezone %>% 
	select(dma_code, timezone) %>%
	mutate(timezone = recode(timezone, ETZ = "America/New_York", CTZ = "America/Chicago", PTZ="America/Los_Angeles")) %>%
	filter(!is.na(dma_code)) %>%
	as.data.table


## get all news programs ##
load("data/stb/news_program_events_dt.RData")
news_schedule[,air_time_utc_end := air_time_utc + dminutes(duration)]

rawfile_name <- function(date) paste("rawxml/FWM_", year(date), str_pad(month(date), pad="0", width=2), str_pad(day(date), pad="0", width=2), "_R.pd.gz", sep="")
reffile_name <- function(date) paste("ref_data/fwm_ref_data_", year(date), str_pad(month(date), pad="0", width=2), str_pad(day(date), pad="0", width=2), ".RData", sep="")

intersect_view <- function(beg, end, chan, tz, tune_data) {	
	
	overlaps <- tune_data[timezone==tz & channel == chan & event_time_utc < end & event_time_utc_end >= beg]
	overlaps[, event_time_utc := pmax(beg, event_time_utc)]
	overlaps[, event_time_utc_end := pmin(end, event_time_utc_end)]
	overlaps
}

save_news_intervals <- function(d) {

	cat(as.character(d), "\n")
	cat("\tLoading T/C group definitions...\n")

	if (d < mdy("09/01/2012")){
		t_c_tmw <- readRDS(paste0("dd/tc_groups_", as.character(mdy("09/01/2012")), ".rds"))	
		t_c_tday <- t_c_tmw
		t_c_yday <- t_c_tmw
	} else if (d == mdy("09/01/2012")) {
		t_c_tday <- readRDS(paste0("dd/tc_groups_", as.character(d), ".rds"))
		t_c_yday <- t_c_tday
		t_c_tmw <- readRDS(paste0("dd/tc_groups_", as.character(d+1), ".rds"))	
	} else if (d == mdy("11/06/2012")) {
		t_c_tday <- readRDS(paste0("dd/tc_groups_", as.character(d), ".rds"))
		t_c_yday <- readRDS(paste0("dd/tc_groups_", as.character(d-1), ".rds"))
		t_c_tmw <- t_c_tday
	} else if (d > mdy("11/06/2012")) {
		t_c_yday <- readRDS(paste0("dd/tc_groups_", as.character(mdy("11/06/2012")), ".rds"))
		t_c_tday <- t_c_yday
		t_c_tmw <- t_c_yday
	} else {
		t_c_tday <- readRDS(paste0("dd/tc_groups_", as.character(d), ".rds"))
		t_c_yday <- readRDS(paste0("dd/tc_groups_", as.character(d-1), ".rds"))
		t_c_tmw <- readRDS(paste0("dd/tc_groups_", as.character(d+1), ".rds"))
	}
	
	t_c_by_ad <- bind_rows(t_c_yday, t_c_tday, t_c_tmw)

	# construct pre, post news consumption
	# for the union of devices in all T / C groups
	dev_list <- t_c_by_ad$t_c %>% rbindlist %$% unique(device_id) 
	
	cat("\tReading raw data...\n")	
	raw_view <- d %>% rawfile_name %>%
		fread( 
         sep="|", 
         col.names = c("mso", "device_id", "event_date", "event_time", "event_type", "event_value", "event_name", "event_id"),
         colClasses="character",
         showProgress=F
        ) %>%
        .[device_id %in% dev_list,] %>%
		setnames(old=c("event_name", "event_value"), new=c("channel", "channel_num"))

	cat("\tConverting times and dates...\n")
	raw_view[,event_date := ymd(event_date)]
    raw_view[,event_time := hms(hours=as.numeric(substr(event_time,1,2)), minutes=as.numeric(substr(event_time, 3,4)), seconds=as.numeric(substr(event_time, 5,6)))]

	raw_view[channel_num == "65532", event_type := "O"]
	raw_view[event_type != "T", channel := "OFF"]

	ref_day <- reffile_name(d) %>% readRDS %>% 
		as.data.table %>%
		.[,c("device_id","dma_code","zipcode")] %>%
		.[dma_timezone, on="dma_code"]

	raw_view <- ref_day[raw_view,on="device_id"]
	
	# replace missing dmas with LEXINGTON KY (541)
	raw_view[is.na(dma_code), dma_code:="541"]
	raw_view[is.na(timezone), timezone:="America/New_York"]

	raw_view[timezone == "America/New_York", event_time_utc := ymd_hms(paste(event_date, event_time, sep=" "), tz="America/New_York") %>% with_tz("UTC")]
	raw_view[timezone == "America/Chicago", event_time_utc := ymd_hms(paste(event_date, event_time, sep=" "), tz="America/Chicago") %>% with_tz("UTC")]
	raw_view[timezone == "America/Los_Angeles", event_time_utc := ymd_hms(paste(event_date, event_time, sep=" "), tz="America/Los_Angeles") %>% with_tz("UTC")]
	raw_view[is.na(event_time_utc), event_time_utc := ymd_hms(paste(event_date, event_time, sep=" "), tz="America/New_York") %>% with_tz("UTC")]


	cat("\tAppending end-of-day events...\n")
	end_of_day <- data.table(
		event_date = d,
		event_time = hms(hours=rep(23,3),minutes=rep(59,3),seconds=rep(59,3)),
		timezone = c("America/New_York", "America/Chicago", "America/Los_Angeles"))
	end_of_day[, event_time_utc := imap(timezone, ~ ymd_hms(paste(event_date[.y], event_time[.y], sep=" "), tz = .x) %>% with_tz("UTC")) %>% reduce(c) ]


	every_dev_end <- unique(raw_view, by = "device_id")[,.(device_id,dma_code,zipcode,timezone,mso,event_id)]
	every_dev_end[,event_type:="E"]
	every_dev_end[,event_id:=paste(device_id, "END", sep="_")]
	every_dev_end[,channel_num:=0]
	every_dev_end[,channel:="END"]

	every_dev_end <- end_of_day[every_dev_end, on="timezone"]

	raw_view <- rbind(raw_view, every_dev_end)
	raw_view <- raw_view[order(device_id, event_time_utc),]

	cat("\tGenerating viewing intervals...\n")
	raw_view[, event_time_utc_end := lead(event_time_utc), by = .(device_id)]
	raw_view <- raw_view[!is.na(event_time_utc_end)]

	## cap segment time at 5h (18000s), ~ 99th percentile
	raw_view[event_type=="T", dur := time_length(event_time_utc_end - event_time_utc, unit="second")]
	raw_view[dur > 18000, `:=` (dur = 18000, event_time_utc_end = event_time_utc + dseconds(18000)) ]

	raw_view <- raw_view[event_type=="T" & dur >= 1]

	gc(verbose=F)

	cat("\tIntersecting with news intervals...\n")
	setkeyv(raw_view, c("channel", "event_time_utc", "event_time_utc_end"))
	news_schedule_day <- news_schedule[air_time_utc <= max(raw_view$event_time_utc_end) & air_time_utc_end >= min(raw_view$event_time_utc),]
	
	news_view <- pmap(list(beg=news_schedule_day$air_time_utc, 
					  end=news_schedule_day$air_time_utc_end,
					  chan=news_schedule_day$channel,
					  tz=news_schedule_day$timezone),
					  intersect_view,
					  tune_data=raw_view) %>%
				 rbindlist %>%
				 unique    # make sure we don't create any duplicate view intervals if there are overlapping entries in progs data

	news_view %>% saveRDS(file=paste0("data/news_intervals/news_intervals_", as.character(d), ".rds"))

}


dates <- seq(mdy("08/31/2012"), mdy("11/07/2012"), by = "days")

dates_split <- split(dates, f = 1:20)
mclapply(dates_split, FUN = function(d) walk(d, save_news_intervals), mc.cores=20)
